/**
 * 
 */
package com.douyu.ocean.connect.router.service.impl;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

import org.apache.commons.lang3.StringUtils;
import org.ocean.connect.common.constant.Constant;
import org.ocean.connect.router.api.enums.DeviceDomainEnum;
import org.ocean.connect.router.api.model.DefaultCCMessage;
import org.ocean.connect.router.api.model.TianxiaoConnectSession;
import org.ocean.connect.router.api.util.TxCcSessionKeyUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.context.request.async.DeferredResult;

import com.alibaba.dubbo.common.utils.CollectionUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baijia.commons.cache.CacheUtil;
import com.douyu.ocean.connect.router.result.BaseApiResult;
import com.douyu.ocean.connect.router.result.BaseResult;
import com.douyu.ocean.connect.router.service.MessageSendService;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.mashape.unirest.http.HttpResponse;
import com.mashape.unirest.http.JsonNode;
import com.mashape.unirest.http.Unirest;
import com.mashape.unirest.http.async.Callback;
import com.mashape.unirest.http.exceptions.UnirestException;

import lombok.extern.slf4j.Slf4j;

/**
 * @author leiruiqi
 *
 */
@Service("messageSendService")
@Slf4j
public class MessageSendServiceImpl implements MessageSendService {

	@Value("${route.cc.url.send}")
	private String sendMessageApiUrl = "/sendMessage/sendSingle";

	@Value("${route.cc.url.batch.send}")
	private String sendBatchMessageApiUrl = "/sendMessage/sendBatch";

	@Value("${route.cc.port}")
	private String sendMessageApiDefaultPort = "80";

	@Autowired
	private StringRedisTemplate businessRedisTemplate;

	private final Map<String, DeferredResult<BaseResult>> messageRequests = new ConcurrentHashMap<String, DeferredResult<BaseResult>>();

	@Override
	public BaseResult messageSend(String bizType, String uid, String deviceDomain, String deviceId,
			String messageJsonStr) {
		BaseResult result = new BaseResult();
		if (StringUtils.isBlank(bizType) || StringUtils.isBlank(uid)) {
			result.setSuccese(false);
			result.setMsg("params can not null");
			return result;
		}

		if (StringUtils.isNotBlank(deviceId)) {
			TianxiaoConnectSession session = null;
			String sessionKey = (String) CacheUtil.getValue(bizType + deviceId);
			if (StringUtils.isNotBlank(sessionKey)) {
				session = getSessionFromCache(sessionKey);
				if (session != null) {
					return messageSend(session, messageJsonStr);
				}

			}
			result.setSuccese(false);
			result.setMsg("user not online");
			return result;
		}

		String sessionKey = TxCcSessionKeyUtil.createTxSessionKeyUserId(bizType, uid);
		log.info("[Redis] Get keys.key={}", sessionKey + "*");
		long start = System.currentTimeMillis();
		Set<String> keys = CacheUtil.getkeys(sessionKey + "*");
		log.info("[Redis] Get key cost={}", (System.currentTimeMillis() - start));
		// String sessionKey = "tx-"+bizType+"-"+uid+"-"+deviceDomain;
		if (keys == null || keys.size() == 0) {
			result.setSuccese(false);
			result.setMsg("user not online");
			return result;
		}
		boolean sendSuccese = false;
		String tempMsg = "";
		for (String key : keys) {
			TianxiaoConnectSession session = getSessionFromCache(key);
			if (session != null) {
				BaseResult resultSingle = messageSend(session, messageJsonStr);
				if (resultSingle.isSuccese()) {
					sendSuccese = true;
				} else {
					tempMsg += resultSingle.getMsg() + "|";
				}
			}

		}
		if (sendSuccese) {
			result.setSuccese(true);
		} else {
			result.setSuccese(false);
			result.setMsg(tempMsg);
		}

		return result;
		/*
		 * TianxiaoConnectSession session = getSessionFromCache(sessionKey);
		 * if(session == null){ result.setSuccese(false);
		 * result.setMsg("user not online"); return result; }
		 * 
		 * RestTemplate restTemplate = new RestTemplate(); MultiValueMap<String,
		 * Object> params = new LinkedMultiValueMap<String, Object>();
		 * 
		 * 
		 * params.add("sessionKey", sessionKey); params.add("message",
		 * messageJsonStr);
		 * 
		 * String uri=
		 * "http://"+session.getSessionIp()+":"+sendMessageApiDefaultPort+
		 * sendMessageApiUrl; BaseApiResult sendresult =
		 * restTemplate.postForObject(uri, params, BaseApiResult.class); //
		 * 失败重发消息一次，有可能缓存更新不准确导致，所以重新获取缓存发送，以后会精确判断返回code只有是用户不存在的时候才重新检测发送
		 * if(!StringUtils.equals("1",sendresult.getCode())){ session =
		 * this.getSessionFromCache(sessionKey); if(session!=null){ uri=
		 * "http://"+session.getSessionIp()+":"+sendMessageApiDefaultPort+
		 * sendMessageApiUrl; sendresult = restTemplate.postForObject(uri,
		 * params, BaseApiResult.class); } }
		 * 
		 * if(!StringUtils.equals("1",sendresult.getCode())){
		 * //两次送达不到移除此session，这里应该通知对应的cc系统去移除，暂时简化一下，后面再详细补充代码
		 * removeSessionFromCache(sessionKey);
		 * 
		 * result.setSuccese(false); result.setMsg(sendresult.getMsg()); return
		 * result; } result.setSuccese(true); return result;
		 */
	}

	private TianxiaoConnectSession getSessionFromCache(String sessionKey) {
		Object o = CacheUtil.getValue(sessionKey);
		if (o == null) {
			return null;
		}
		return (TianxiaoConnectSession) o;
		/*
		 * String jsonStr = null; try { jsonStr = (String) o;
		 * TianxiaoConnectSession session = JSON.parseObject(jsonStr,
		 * TianxiaoConnectSession.class); return session; } catch (Exception e)
		 * { logger.error("cache Serializable exception ",e); return null; }
		 */
	}

	private void removeSessionFromCache(String sessionKey) {
		CacheUtil.delete(sessionKey);
	}

	@Override
	public BaseResult messageSend(TianxiaoConnectSession session, String messageJsonStr) {
		BaseResult result = new BaseResult();

		RestTemplate restTemplate = new RestTemplate();
		MultiValueMap<String, Object> params = new LinkedMultiValueMap<String, Object>();

		params.add("sessionKey", session.getSessionKey());
		params.add("message", messageJsonStr);

		String uri = "http://" + session.getSessionIp() + sendMessageApiUrl;
		log.info("messageSend url=" + uri);
		BaseApiResult sendresult = restTemplate.postForObject(uri, params, BaseApiResult.class);
		// 失败重发消息一次，有可能缓存更新不准确导致，所以重新获取缓存发送，以后会精确判断返回code只有是用户不存在的时候才重新检测发送
		if (!StringUtils.equals("1", sendresult.getCode())) {
			session = this.getSessionFromCache(session.getSessionKey());
			if (session != null) {
				uri = "http://" + session.getSessionIp() + sendMessageApiUrl;
				sendresult = restTemplate.postForObject(uri, params, BaseApiResult.class);
			}
		}

		if (!StringUtils.equals("1", sendresult.getCode())) {
			// 两次送达不到移除此session，这里应该通知对应的cc系统去移除，暂时简化一下，后面再详细补充代码
			removeSessionFromCache(session.getSessionKey());

			result.setSuccese(false);
			result.setMsg(sendresult.getMsg());
			return result;
		}
		result.setSuccese(true);
		return result;
	}

	public BaseResult sendMessageFromDubble(TianxiaoConnectSession session, String messageJsonStr) {
		BaseResult result = new BaseResult();
		/*
		 * MessageApi messageApi = getServiceByIp(session.getSessionIp());
		 * if(messageApi!=null){ MessageApiResult apiresult =
		 * messageApi.sendSingle(session.getSessionKey(), messageJsonStr);
		 * if(apiresult.isSuccese()){ result.setSuccese(true); }
		 * result.setMsg(apiresult.getMsg()); }
		 */
		return result;
	}

	/*
	 * public MessageApi getServiceByIp(String ip){ String url =
	 * "dubbo://"+ip+":21880/com.demo.service.DemoService";//更改不同的Dubbo服务暴露的ip地址
	 * &端口
	 * 
	 * ReferenceBean referenceBean = new ReferenceBean();
	 * referenceBean.setInterface(MessageApi.class); referenceBean.setUrl(url);
	 * 
	 * try { referenceBean.afterPropertiesSet(); MessageApi demoService =
	 * (MessageApi)referenceBean.get(); return demoService; } catch (Exception
	 * e) { e.printStackTrace(); return null; } }
	 */

	@Override
	public BaseResult sendMessageFromDubble(String bizType, String uid, String deviceDomain, String messageJsonStr) {
		BaseResult result = new BaseResult();
		if (StringUtils.isBlank(bizType) || StringUtils.isBlank(uid) || StringUtils.isBlank(deviceDomain)) {
			result.setSuccese(false);
			result.setMsg("params can not null");
			return result;
		}

		String sessionKey = "tx-" + bizType + "-" + uid + "-" + deviceDomain;

		TianxiaoConnectSession session = getSessionFromCache(sessionKey);
		if (session == null) {
			result.setSuccese(false);
			result.setMsg("user not online");
			return result;
		}
		return sendMessageFromDubble(session, messageJsonStr);
	}

	@Deprecated
	@Override
	public BaseResult messageSendByWSChannel(String bizType, String uid, String deviceDomain, String messageJsonStr) {
		BaseResult result = new BaseResult();
		if (StringUtils.isBlank(bizType) || StringUtils.isBlank(uid) || StringUtils.isBlank(deviceDomain)) {
			result.setSuccese(false);
			result.setMsg("params can not null");
			return result;
		}

		String sessionKey = "tx-" + bizType + "-" + uid + "-" + deviceDomain;

		TianxiaoConnectSession session = getSessionFromCache(sessionKey);
		if (session == null) {
			result.setSuccese(false);
			result.setMsg("user not online");
			return result;
		}
		return messageSendByWSChannel(session, messageJsonStr);
	}

	@Deprecated
	@Override
	public BaseResult messageSendByWSChannel(TianxiaoConnectSession session, String messageJsonStr) {
		BaseResult result = new BaseResult();

		return result;
	}

	private DeferredResult<BaseResult> postMsg(DefaultCCMessage message) {

		final String msgId = message.getMessageId();
		// 2000毫秒超时
		DeferredResult<BaseResult> dr = new DeferredResult<BaseResult>(2000L);
		dr.onTimeout(new Runnable() {
			@Override
			public void run() {
				messageRequests.remove(msgId);
			}
		});

		dr.onCompletion(new Runnable() {
			@Override
			public void run() {
				messageRequests.remove(msgId);
			}
		});

		return dr;
	}

	@Override
	public BaseResult messageSendByUidPrefix(String bizType, String userIdPrefix, String messageJsonStr) {
		BaseResult result = new BaseResult();
		if (StringUtils.isBlank(bizType) || StringUtils.isBlank(userIdPrefix)) {
			result.setSuccese(false);
			result.setMsg("params can not null");
			return result;
		}

		String sessionKey = TxCcSessionKeyUtil.createTxSessionKeyUserIdPrefix(bizType, userIdPrefix);
		log.info("[Redis] Get keys.key={}", sessionKey + "*");
		long start = System.currentTimeMillis();
		Set<String> keys = CacheUtil.getkeys(sessionKey + "*");
		log.info("[Redis] Get key cost={}", (System.currentTimeMillis() - start));
		if (keys == null || keys.size() == 0) {
			result.setSuccese(false);
			result.setMsg("user no one online");
			return result;
		}
		boolean sendSuccese = false;
		String tempMsg = "";
		for (String key : keys) {
			TianxiaoConnectSession session = getSessionFromCache(key);
			if (session != null) {
				BaseResult resultSingle = messageSend(session, messageJsonStr);
				if (resultSingle.isSuccese()) {
					sendSuccese = true;
				} else {
					tempMsg += resultSingle.getMsg() + "|";
				}
			}

		}
		if (sendSuccese) {
			result.setSuccese(true);
		} else {
			result.setSuccese(false);
			result.setMsg(tempMsg);
		}

		return result;
	}

	private String buildMsg(String bodyStr) {
		JSONObject json = new JSONObject();

		json.put("message_type", "new_messages");
		json.put("sign", "");
		// String bodyStr =
		// "{\"payload\":{\"action\":\"tianxiao-crm-msgs\",\"list\":[{\"type\":3,\"msg\":{\"msgType\":0,\"msgId\":5378,\"sender\":4073,\"receiver\":331781449,\"time\":1463196002592,\"content\":{\"text\":\"hello,QQ\",\"url\":\"\",\"width\":0,\"height\":0,\"length\":0,\"callId\":0}},\"user\":{\"consultUserId\":4073,\"intentLevel\":0,\"mobile\":\"15600009999\",\"name\":\"??????\",\"userRole\":2}}]},\"type\":1}";
		JSONObject messageBody = JSON.parseObject(
				"{\"payload\":{\"action\":\"tianxiao-crm-msgs\",\"list\":[{\"type\":3,\"msg\":{\"msgType\":0,\"msgId\":5378,\"sender\":4073,\"receiver\":331781449,\"time\":1463196002592,\"content\":{\"text\":\"aabbbbbbbb\",\"url\":\"\",\"width\":0,\"height\":0,\"length\":0,\"callId\":0}},\"user\":{\"consultUserId\":4073,\"intentLevel\":0,\"mobile\":\"15600009999\",\"name\":\"??????\",\"userRole\":2}}]},\"type\":1}");

		JSONObject response = new JSONObject();
		JSONArray msgs = new JSONArray();
		JSONObject msg = new JSONObject();
		msgs.add(msg);
		msg.put("body", bodyStr);
		msg.put("msg_t", "7");

		response.put("code", 0);
		response.put("msg", "");
		response.put("ts", 0);
		JSONObject data = new JSONObject();
		data.put("msgs", msgs);
		response.put("data", data);
		json.put("response", response.toJSONString());
		// System.out.println(json.toJSONString());
		// params.add("message", json.toJSONString());
		return json.toJSONString();
	}

	@Override
	public BaseResult asyncMessageSend(String bizType, String uid, String deviceDomain, String deviceId,
			String messageJsonStr) {
		BaseResult result = new BaseResult();
		if (StringUtils.isBlank(bizType) || StringUtils.isBlank(uid)) {
			result.setSuccese(false);
			result.setMsg("params can not null");
			return result;
		}

		if (StringUtils.isNotBlank(deviceId)) {
			TianxiaoConnectSession session = null;
			String sessionKey = (String) CacheUtil.getValue(bizType + deviceId);
			if (StringUtils.isNotBlank(sessionKey)) {
				session = getSessionFromCache(sessionKey);
				if (session != null) {
					return messageSend(session, messageJsonStr);
				}

			}
			result.setSuccese(false);
			result.setMsg("user not online");
			return result;
		}

		String sessionKey = TxCcSessionKeyUtil.createTxSessionKeyUserId(bizType, uid);
		log.info("[Redis] Get keys.key={}", sessionKey + "*");
		long start = System.currentTimeMillis();
		Set<String> keys = CacheUtil.getkeys(sessionKey + "*");
		log.info("[Redis] Get key cost={}", (System.currentTimeMillis() - start));
		// String sessionKey = "tx-"+bizType+"-"+uid+"-"+deviceDomain;
		if (keys == null || keys.size() == 0) {
			result.setSuccese(false);
			result.setMsg("user not online");
			return result;
		}
		boolean sendSuccese = false;
		String tempMsg = "";
		for (String key : keys) {
			TianxiaoConnectSession session = getSessionFromCache(key);
			if (session != null) {
				BaseResult resultSingle = asyncMessageSend(session, messageJsonStr);
				if (resultSingle.isSuccese()) {
					sendSuccese = true;
				} else {
					tempMsg += resultSingle.getMsg() + "|";
				}
			}

		}
		if (sendSuccese) {
			result.setSuccese(true);
		} else {
			result.setSuccese(false);
			result.setMsg(tempMsg);
		}

		return result;
	}

	@Override
	public BaseResult asyncMessageSend(TianxiaoConnectSession session, String messageJsonStr) {
		BaseResult result = new BaseResult();

		MultiValueMap<String, Object> params = new LinkedMultiValueMap<String, Object>();
		params.add("sessionKey", session.getSessionKey());
		params.add("message", messageJsonStr);

		String uri = "http://" + session.getSessionIp() + sendMessageApiUrl;
		log.info("[asyncMessageSend url]:{}", uri);

		try {
			Future<HttpResponse<JsonNode>> future = Unirest.post(uri).body(params)
					.asJsonAsync(new Callback<JsonNode>() {

						public void completed(HttpResponse<JsonNode> response) {
							int code = response.getStatus();
							JsonNode body = response.getBody();
							log.info("[asyncMessageSend success] code:{},result:{}", code, body);
						}

						public void failed(UnirestException e) {
							log.error("[asyncMessageSend UnirestException]", e);
						}

						public void cancelled() {
							log.info("[asyncMessageSend canceled!]");
						}
					});
		} catch (Exception e) {
			log.error("[asyncMessageSend Exception]", e);
		}
		result.setSuccese(true);
		return result;
	}

	@Override
	public BaseResult asyncMessageSendBatch(String bizType, String uid, String deviceType, String equipmentDomain,
			List<String> deviceIds, String messageJsonStr) {
		BaseResult result = new BaseResult();
		if (StringUtils.isBlank(bizType) || StringUtils.isBlank(uid)) {
			result.setSuccese(false);
			result.setMsg("params can not null");
			return result;
		}
		List<String> sessionKeys = Lists.newArrayList();
		for (String deviceId : deviceIds) {
			String sessionKey = TxCcSessionKeyUtil.createTxSessionKey(bizType, uid,
					String.valueOf(DeviceDomainEnum.MOBILE.getCode()), deviceType, deviceId);
			sessionKeys.add(sessionKey);
		}
		// 批量发sessionKeys给cc
		BaseResult resultBatch = asyncMessageSendBatch(sessionKeys, messageJsonStr);
		if (resultBatch.isSuccese()) {
			result.setSuccese(true);
		} else {
			result.setSuccese(false);
			result.setMsg(resultBatch.getMsg());
		}

		return result;
	}

	@Override
	public BaseResult asyncMessageSendBatch(List<String> sessionKeys, String messageJsonStr) {
		BaseResult result = new BaseResult();

		Map<String, Object> params = new HashMap<String, Object>();
		params.put("message", messageJsonStr);
		StringBuilder sessionKeysStr = new StringBuilder();
		for(Object object : sessionKeys){
			sessionKeysStr.append(object.toString()).append(",");
		}
		params.put("sessionKeys", sessionKeysStr.deleteCharAt(sessionKeysStr.length()-1).toString());
		// 向每一个cc发同样请求，cc遍历自己map，有则发，无则忽略
		// cc服务器地址，由cc启动时，自己写入redis，这里只读
		List<String> hosts = businessRedisTemplate.opsForList().range(Constant.CC_SERVER_URL_KEY, 0,
				businessRedisTemplate.opsForList().size(Constant.CC_SERVER_URL_KEY) - 1);
		Set<String> exists = Sets.newHashSet();
		if (CollectionUtils.isNotEmpty(hosts)) {
			for (String host : hosts) {
				if(exists.contains(host)){
					continue;
				}else{
					exists.add(host);
				}
				String uri = "http://" + host + sendBatchMessageApiUrl;
				log.info("[asyncMessageSendBatch url]:{}", uri);
				try {
					Future<HttpResponse<JsonNode>> future = Unirest.post(uri).fields(params)
							.asJsonAsync(new Callback<JsonNode>() {

								public void completed(HttpResponse<JsonNode> response) {
									int code = response.getStatus();
									JsonNode body = response.getBody();
									log.info("[asyncMessageSendBatch success] code:{},result:{}", code, body);
								}

								public void failed(UnirestException e) {
									log.error("[asyncMessageSendBatch UnirestException]", e);
								}

								public void cancelled() {
									log.info("[asyncMessageSendBatch canceled!]");
								}
							});
				} catch (Exception e) {
					log.error("[asyncMessageSendBatch Exception]", e);
				}
			}
		}
		result.setSuccese(true);
		return result;
	}
}
